package com.da.core.util;

import java.util.*;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.function.*;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
 * @author da
 * @time 2023/10/13 上午 8:43
 */
public interface Seq<T> {
    void consume(Consumer<T> consumer);

    static <T> Seq<T> unit(T t) {
        return c -> c.accept(t);
    }

    /**
     * 将类型为T的流，转换为类型为E的流
     *
     * @param function 处理T返回E
     * @param <E>      返回流的类型
     * @return 数据流
     */
    default <E> Seq<E> map(Function<T, E> function) {
        return c -> consume(t -> c.accept(function.apply(t)));
    }

    /**
     * 分段map操作
     *
     * @param function      对前n个元素的处理
     * @param n             划分前n个元素
     * @param otherFunction 对后n个元素进行处理
     * @param <E>           返回数据流的类型
     * @return 数据流
     */
    default <E> Seq<E> map(Function<T, E> function, int n, Function<T, E> otherFunction) {
        return c -> {
            int[] size = {n};
            consume(t -> {
                if (size[0] > 0) {
                    size[0]--;
                    System.out.println("size " + t);
                    c.accept(function.apply(t));
                } else {
                    c.accept(otherFunction.apply(t));
                }
            });
        };
    }

    /**
     * 将每个元素展开为一个流之后再合并
     *
     * @param function 处理原始流返回对应的输出流
     * @param <E>      返回流的类型
     * @return 数据流
     */
    default <E> Seq<E> flatMap(Function<T, Seq<E>> function) {
        return c -> consume(t -> function.apply(t).consume(c));
    }

    /**
     * 元素过滤,把不符合条件的元素过滤掉
     *
     * @param predicate 判断条件
     * @return 数据流
     */
    @SuppressWarnings("all")
    default Seq<T> filter(Predicate<T> predicate) {
        return c -> consume(t -> {
            if (predicate.test(t)) {
                c.accept(t);
            }
        });
    }

    /**
     * 用来停止流
     * 由于Seq并不依赖iterator，所以必须通过异常实现中断
     */
    default void stop() {
        throw StopException.INSTANCE;
    }

    /**
     * 包裹需要停止的流
     *
     * @param consumer 用于包裹流
     */
    default void consumeTillStop(Consumer<T> consumer) {
        try {
            consume(consumer);
        } catch (StopException ignored) {

        }
    }

    /**
     * 获取前n个元素，后面的不要
     *
     * @param n 前n个元素
     * @return 数据流
     */
    default Seq<T> take(int n) {
        return c -> {
            int[] i = {n};
            consumeTillStop(t -> {
                if (i[0]-- > 0) {
                    c.accept(t);
                } else {
                    stop();
                }
            });
        };
    }

    /**
     * drop是与take对应的概念，丢弃前n个元素
     * 它并不涉及流的中断控制，反而更像是filter的变种，一种带有状态的filter
     *
     * @param n 前n个元素
     * @return 数据流
     */
    default Seq<T> drop(int n) {
        return c -> {
            int[] a = {n - 1};
            consume(t -> {
                if (a[0] < 0) {
                    c.accept(t);
                } else {
                    a[0]--;
                }
            });
        };
    }

    /**
     * 对流的某个元素添加一个操作consumer，但是不执行流
     *
     * @param consumer 操作
     * @return 数据流
     */
    default Seq<T> onEach(Consumer<T> consumer) {
        return c -> consume(consumer.andThen(c));
    }

    /**
     * 将一个流与一个可迭代的元素集合进行两两聚合操作，并将结果转换为新的流
     *
     * @param iterable 一个实现了Iterable接口的对象,它包含需要与之聚合的元素
     * @param function 用于聚合操作的二元函数,接受两个参数: 第一个参数是当前元素,第二个参数是iterable中的下一个元素
     * @param <E>      聚合操作的元素类型
     * @param <R>      返回流中元素类型和返回值类型
     * @return 数据流
     */
    default <E, R> Seq<R> zip(Iterable<E> iterable, BiFunction<T, E, R> function) {
        return c -> {
            Iterator<E> iterator = iterable.iterator();
            consumeTillStop(t -> {
                if (iterator.hasNext()) {
                    c.accept(function.apply(t, iterator.next()));
                } else {
                    stop();
                }
            });
        };
    }

    default String join(String sep) {
        StringJoiner joiner = new StringJoiner(sep);
        consume(t -> joiner.add(t.toString()));
        return joiner.toString();
    }

    /**
     * 把流转成List
     *
     * @return List
     */
    default List<T> toList() {
        List<T> list = new ArrayList<>();
        consume(list::add);
        return list;
    }

    /**
     * 把流转成Set
     *
     * @return Set
     */
    default Set<T> toSet() {
        Set<T> set = new HashSet<>();
        consume(set::add);
        return set;
    }

    /**
     * 把 Seq 转换成 Stream
     *
     * @param seq 要转换的 Seq
     * @param <T> 数据类型
     * @return Stream
     */
    static <T> Stream<T> stream(Seq<T> seq) {
        Iterator<T> iterator = new Iterator<T>() {

            @Override
            public boolean hasNext() {
                throw new NoSuchElementException();
            }

            @Override
            public T next() {
                throw new NoSuchElementException();
            }

            @Override
            public void forEachRemaining(Consumer<? super T> action) {
                seq.consume(action::accept);
            }
        };
        return StreamSupport.stream(
                Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED),
                false
        );
    }

    /**
     * 流的缓存
     *
     * @return 缓存数组
     */
    default Seq<T> cache() {
        ArraySeq<T> arraySeq = new ArraySeq<>();
        consume(arraySeq::add);
        return arraySeq;
    }

    /**
     * 异步消费
     *
     * @param consumer 消费操作
     */
    default void asyncConsume(Consumer<T> consumer) {
        ForkJoinPool pool = ForkJoinPool.commonPool();
        map(t -> pool.submit(() -> consumer.accept(t)))
                .cache()
                .consume(ForkJoinTask::join);
    }

    /**
     * 并发流
     *
     * @return 并发流
     */
    default Seq<T> parallel() {
        ForkJoinPool pool = ForkJoinPool.commonPool();
        return c -> map(t -> pool.submit(() -> c.accept(t))).cache().consume(ForkJoinTask::join);
    }

    /**
     * 生成流
     *
     * @param values 元素列表
     * @return 数据流
     */
    @SafeVarargs
    static <T> Seq<T> of(T... values) {
        return c -> {
            for (T value : values) {
                c.accept(value);
            }
        };
    }

    /**
     * 元素去重
     *
     * @return 去重后的流
     */
    default Seq<T> distinct() {
        return c -> {
            for (T t : toSet()) {
                c.accept(t);
            }
        };
    }

    /**
     * reduce
     *
     * @param function 对元素的操作
     * @return 操作结果
     */
    default T reduce(BiFunction<T, T, T> function) {
        Optional<T> reduce = toList().stream().reduce(function::apply);
        return reduce.orElse(null);
    }

    /**
     * 生成从指定位置开始指定位置结束的Integer流
     *
     * @param start 开始位置
     * @param end   结束位置
     * @return Integer流
     */
    static Seq<Integer> range(int start, int end) {
        return c -> {
//            如果开始位置大于结束位置需要从大到小生成
            if (start > end) {
                for (int i = start; i > end; i--) {
                    c.accept(i);
                }
            } else {
                for (int i = start; i < end; i++) {
                    c.accept(i);
                }
            }

        };
    }

    /**
     * 生成0-n的Integer流
     *
     * @param n 结束位置
     * @return Integer流
     */
    static Seq<Integer> range(int n) {
        return range(0, n);
    }

    /**
     * 获取流的长度
     *
     * @return 流的长度
     */
    default Long count() {
        return (long) toList().size();
    }

    /**
     * 是否有指定的元素
     *
     * @param predicate
     * @return
     */
    @SuppressWarnings("all")
    default boolean some(Predicate<T> predicate) {
        return filter(predicate).toList().size() > 0;
    }
}
